This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch sbp
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/sbp by this push:
     new 525fe16  Fix function ordering
525fe16 is described below

commit 525fe1610c037c7e2a05a7ee3a18a3f44be9fe1c
Author: Sean B. Palmer <[email protected]>
AuthorDate: Tue Feb 3 16:11:45 2026 +0000

    Fix function ordering
---
 atr/get/sbom.py            | 578 ++++++++++++++++++++++-----------------------
 atr/shared/distribution.py | 333 +++++++++++++-------------
 atr/shared/ignores.py      |  52 ++--
 atr/util.py                |   2 +-
 tests/unit/test_cache.py   |  22 +-
 tests/unit/test_ldap.py    |   8 +-
 tests/unit/test_mail.py    | 282 +++++++++++-----------
 tests/unit/test_user.py    |  44 ++--
 tests/unit/test_vote.py    | 180 +++++++-------
 9 files changed, 750 insertions(+), 751 deletions(-)

diff --git a/atr/get/sbom.py b/atr/get/sbom.py
index fc56c59..9a16a47 100644
--- a/atr/get/sbom.py
+++ b/atr/get/sbom.py
@@ -121,168 +121,6 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
     return await template.blank("SBOM report", content=block.collect())
 
 
-async def _fetch_tasks(
-    file_path: str, project: str, release: sql.Release, version: str
-) -> tuple[sql.Task | None, Sequence[sql.Task], Sequence[sql.Task]]:
-    # TODO: Abstract this code and the sbomtool.MissingAdapter validators
-    async with db.session() as data:
-        via = sql.validate_instrumented_attribute
-        tasks = (
-            await data.task(
-                project_name=project,
-                version_name=version,
-                revision_number=release.latest_revision_number,
-                task_type=sql.TaskType.SBOM_TOOL_SCORE,
-                primary_rel_path=file_path,
-            )
-            .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
-            .all()
-        )
-        augment_tasks = (
-            await data.task(
-                project_name=project,
-                version_name=version,
-                task_type=sql.TaskType.SBOM_AUGMENT,
-                primary_rel_path=file_path,
-            )
-            .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
-            .all()
-        )
-        # Run or running scans for the current revision
-        osv_tasks = (
-            await data.task(
-                project_name=project,
-                version_name=version,
-                task_type=sql.TaskType.SBOM_OSV_SCAN,
-                primary_rel_path=file_path,
-                revision_number=release.latest_revision_number,
-            )
-            .order_by(sql.sqlmodel.desc(via(sql.Task.added)))
-            .all()
-        )
-        return (tasks[0] if (len(tasks) > 0) else None), augment_tasks, 
osv_tasks
-
-
-def _outdated_tool_section(block: htm.Block, task_result: 
results.SBOMToolScore):
-    block.h2["Outdated tools"]
-    if task_result.outdated:
-        outdated = []
-        if isinstance(task_result.outdated, str):
-            # Older version, only checked one tool
-            outdated = 
[sbom.models.tool.OutdatedAdapter.validate_python(json.loads(task_result.outdated))]
-        elif isinstance(task_result.outdated, list):
-            # Newer version, checked multiple tools
-            outdated = 
[sbom.models.tool.OutdatedAdapter.validate_python(json.loads(o)) for o in 
task_result.outdated]
-        if len(outdated) == 0:
-            block.p["No outdated tools found."]
-        for result in outdated:
-            if result.kind == "tool":
-                if "Apache Trusted Releases" in result.name:
-                    block.p[
-                        f"""The last version of ATR used on this SBOM was
-                            {result.used_version} but ATR is currently version
-                            {result.available_version}."""
-                    ]
-                else:
-                    block.p[
-                        f"""The {result.name} is outdated. The used version is
-                            {result.used_version} and the available version is
-                            {result.available_version}."""
-                    ]
-            else:
-                if (result.kind == "missing_metadata") or (result.kind == 
"missing_timestamp"):
-                    # These both return without checking any further tools as 
they prevent checking
-                    block.p[
-                        f"""There was a problem with the SBOM detected when 
trying to
-                            determine if any tools were outdated:
-                            {result.kind.upper()}."""
-                    ]
-                else:
-                    block.p[
-                        f"""There was a problem with the SBOM detected when 
trying to
-                            determine if the {result.name} is outdated:
-                            {result.kind.upper()}."""
-                    ]
-    else:
-        block.p["No outdated tools found."]
-
-
-def _conformance_section(block: htm.Block, task_result: results.SBOMToolScore) 
-> None:
-    block.h2["Conformance report"]
-    warnings = 
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(w)) for w in 
task_result.warnings]
-    errors = 
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(e)) for e in 
task_result.errors]
-    if warnings:
-        block.h3[htm.icon("exclamation-triangle-fill", ".me-2.text-warning"), 
"Warnings"]
-        _missing_table(block, warnings)
-
-    if errors:
-        block.h3[htm.icon("x-octagon-fill", ".me-2.text-danger"), "Errors"]
-        _missing_table(block, errors)
-
-    if not (warnings or errors):
-        block.p["No NTIA 2021 minimum data field conformance warnings or 
errors found."]
-
-
-def _license_section(block: htm.Block, task_result: results.SBOMToolScore) -> 
None:
-    block.h2["Licenses"]
-    warnings = []
-    errors = []
-    prev_licenses = None
-    if task_result.prev_licenses is not None:
-        prev_licenses = _load_license_issues(task_result.prev_licenses)
-    if task_result.license_warnings is not None:
-        warnings = _load_license_issues(task_result.license_warnings)
-    if task_result.license_errors is not None:
-        errors = _load_license_issues(task_result.license_errors)
-    # TODO: Rework the rendering of these since category in the table is 
redundant.
-    if warnings:
-        block.h3[htm.icon("exclamation-triangle-fill", ".me-2.text-warning"), 
"Warnings"]
-        _license_table(block, warnings, prev_licenses)
-
-    if errors:
-        block.h3[htm.icon("x-octagon-fill", ".me-2.text-danger"), "Errors"]
-        _license_table(block, errors, prev_licenses)
-
-    if not (warnings or errors):
-        block.p["No license warnings or errors found."]
-
-
-def _load_license_issues(issues: list[str]) -> 
list[sbom.models.licenses.Issue]:
-    return [sbom.models.licenses.Issue.model_validate(json.loads(i)) for i in 
issues]
-
-
-def _report_header(
-    block: htm.Block, is_release_candidate: bool, release: sql.Release, 
task_result: results.SBOMToolScore
-) -> None:
-    block.p[
-        """This is a report by the ATR SBOM tool, for debugging and
-        informational purposes. Please use it only as an approximate
-        guideline to the quality of your SBOM file."""
-    ]
-    if not is_release_candidate:
-        block.p[
-            "This report is for revision ", 
htm.code[task_result.revision_number], "."
-        ]  # TODO: Mark if a subsequent score has failed
-    elif release.phase == sql.ReleasePhase.RELEASE_CANDIDATE:
-        block.p[f"This report is for the latest {release.version} release 
candidate."]
-
-
-async def _report_task_results(block: htm.Block, task: sql.Task | None):
-    if task is None:
-        block.p["No SBOM score found."]
-        return await template.blank("SBOM report", content=block.collect())
-
-    task_status = task.status
-    task_error = task.error
-    if (task_status == sql.TaskStatus.QUEUED) or (task_status == 
sql.TaskStatus.ACTIVE):
-        block.p["SBOM score is being computed."]
-        return await template.blank("SBOM report", content=block.collect())
-
-    if task_status == sql.TaskStatus.FAILED:
-        block.p[f"SBOM score task failed: {task_error}"]
-        return await template.blank("SBOM report", content=block.collect())
-
-
 def _augment_section(
     block: htm.Block,
     release: sql.Release,
@@ -329,6 +167,41 @@ def _augment_section(
             )
 
 
+def _cdx_to_osv(cdx: results.CdxVulnerabilityDetail) -> 
results.VulnerabilityDetails:
+    score = []
+    severity = ""
+    if cdx.ratings is not None:
+        severity, score = sbom.utilities.cdx_severity_to_osv(cdx.ratings)
+    return results.VulnerabilityDetails(
+        id=cdx.id,
+        summary=cdx.description,
+        details=cdx.detail,
+        modified=cdx.updated or "",
+        published=cdx.published,
+        severity=score,
+        database_specific={"severity": severity},
+        references=[{"type": "WEB", "url": a.get("url", "")} for a in 
cdx.advisories]
+        if (cdx.advisories is not None)
+        else [],
+    )
+
+
+def _conformance_section(block: htm.Block, task_result: results.SBOMToolScore) 
-> None:
+    block.h2["Conformance report"]
+    warnings = 
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(w)) for w in 
task_result.warnings]
+    errors = 
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(e)) for e in 
task_result.errors]
+    if warnings:
+        block.h3[htm.icon("exclamation-triangle-fill", ".me-2.text-warning"), 
"Warnings"]
+        _missing_table(block, warnings)
+
+    if errors:
+        block.h3[htm.icon("x-octagon-fill", ".me-2.text-danger"), "Errors"]
+        _missing_table(block, errors)
+
+    if not (warnings or errors):
+        block.p["No NTIA 2021 minimum data field conformance warnings or 
errors found."]
+
+
 def _cyclonedx_cli_errors(block: htm.Block, task_result: 
results.SBOMToolScore):
     block.h2["CycloneDX CLI validation errors"]
     if task_result.cli_errors:
@@ -337,6 +210,12 @@ def _cyclonedx_cli_errors(block: htm.Block, task_result: 
results.SBOMToolScore):
         block.p["No CycloneDX CLI validation errors found."]
 
 
+def _detail_table(components: list[str | None]):
+    return htm.table(".table.table-sm.table-bordered.table-striped")[
+        htm.tbody[[htm.tr[htm.td[comp]] for comp in components if comp is not 
None]],
+    ]
+
+
 def _extract_vulnerability_severity(vuln: results.VulnerabilityDetails) -> str:
     """Extract severity information from vulnerability data."""
     data = vuln.database_specific or {}
@@ -352,6 +231,72 @@ def _extract_vulnerability_severity(vuln: 
results.VulnerabilityDetails) -> str:
     return "Unknown"
 
 
+async def _fetch_tasks(
+    file_path: str, project: str, release: sql.Release, version: str
+) -> tuple[sql.Task | None, Sequence[sql.Task], Sequence[sql.Task]]:
+    # TODO: Abstract this code and the sbomtool.MissingAdapter validators
+    async with db.session() as data:
+        via = sql.validate_instrumented_attribute
+        tasks = (
+            await data.task(
+                project_name=project,
+                version_name=version,
+                revision_number=release.latest_revision_number,
+                task_type=sql.TaskType.SBOM_TOOL_SCORE,
+                primary_rel_path=file_path,
+            )
+            .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
+            .all()
+        )
+        augment_tasks = (
+            await data.task(
+                project_name=project,
+                version_name=version,
+                task_type=sql.TaskType.SBOM_AUGMENT,
+                primary_rel_path=file_path,
+            )
+            .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
+            .all()
+        )
+        # Run or running scans for the current revision
+        osv_tasks = (
+            await data.task(
+                project_name=project,
+                version_name=version,
+                task_type=sql.TaskType.SBOM_OSV_SCAN,
+                primary_rel_path=file_path,
+                revision_number=release.latest_revision_number,
+            )
+            .order_by(sql.sqlmodel.desc(via(sql.Task.added)))
+            .all()
+        )
+        return (tasks[0] if (len(tasks) > 0) else None), augment_tasks, 
osv_tasks
+
+
+def _license_section(block: htm.Block, task_result: results.SBOMToolScore) -> 
None:
+    block.h2["Licenses"]
+    warnings = []
+    errors = []
+    prev_licenses = None
+    if task_result.prev_licenses is not None:
+        prev_licenses = _load_license_issues(task_result.prev_licenses)
+    if task_result.license_warnings is not None:
+        warnings = _load_license_issues(task_result.license_warnings)
+    if task_result.license_errors is not None:
+        errors = _load_license_issues(task_result.license_errors)
+    # TODO: Rework the rendering of these since category in the table is 
redundant.
+    if warnings:
+        block.h3[htm.icon("exclamation-triangle-fill", ".me-2.text-warning"), 
"Warnings"]
+        _license_table(block, warnings, prev_licenses)
+
+    if errors:
+        block.h3[htm.icon("x-octagon-fill", ".me-2.text-danger"), "Errors"]
+        _license_table(block, errors, prev_licenses)
+
+    if not (warnings or errors):
+        block.p["No license warnings or errors found."]
+
+
 def _license_table(
     block: htm.Block,
     items: list[sbom.models.licenses.Issue],
@@ -360,59 +305,18 @@ def _license_table(
     warning_rows = [
         htm.tr[
             htm.td[
-                f"Category {category!s}"
-                if (len(components) == 0)
-                else htm.details[htm.summary[f"Category {category!s}"], 
htm.div[_detail_table(components)]]
-            ],
-            htm.td[f"{count!s} {f'({new!s} new, {updated!s} changed)' if (new 
or updated) else ''}"],
-        ]
-        for category, count, new, updated, components in _license_tally(items, 
prev)
-    ]
-    block.table(".table.table-sm.table-bordered.table-striped")[
-        htm.thead[htm.tr[htm.th["License Category"], htm.th["Count"]]],
-        htm.tbody[*warning_rows],
-    ]
-
-
-def _missing_table(block: htm.Block, items: 
list[sbom.models.conformance.Missing]) -> None:
-    warning_rows = [
-        htm.tr[
-            htm.td[
-                kind.upper()
-                if (len(components) == 0)
-                else htm.details[htm.summary[kind.upper()], 
htm.div[_detail_table(components)]]
-            ],
-            htm.td[prop],
-            htm.td[str(count)],
-        ]
-        for kind, prop, count, components in _missing_tally(items)
-    ]
-    block.table(".table.table-sm.table-bordered.table-striped")[
-        htm.thead[htm.tr[htm.th["Kind"], htm.th["Property"], htm.th["Count"]]],
-        htm.tbody[*warning_rows],
-    ]
-
-
-def _detail_table(components: list[str | None]):
-    return htm.table(".table.table-sm.table-bordered.table-striped")[
-        htm.tbody[[htm.tr[htm.td[comp]] for comp in components if comp is not 
None]],
-    ]
-
-
-def _missing_tally(items: list[sbom.models.conformance.Missing]) -> 
list[tuple[str, str, int, list[str | None]]]:
-    counts: dict[tuple[str, str], int] = {}
-    components: dict[tuple[str, str], list[str | None]] = {}
-    for item in items:
-        key = (getattr(item, "kind", ""), getattr(getattr(item, "property", 
None), "name", ""))
-        counts[key] = counts.get(key, 0) + 1
-        if key not in components:
-            components[key] = [str(item)]
-        elif item.kind == "missing_component_property":
-            components[key].append(str(item))
-    return sorted(
-        [(item, prop, count, components.get((item, prop), [])) for (item, 
prop), count in counts.items()],
-        key=lambda kv: (kv[0], kv[1]),
-    )
+                f"Category {category!s}"
+                if (len(components) == 0)
+                else htm.details[htm.summary[f"Category {category!s}"], 
htm.div[_detail_table(components)]]
+            ],
+            htm.td[f"{count!s} {f'({new!s} new, {updated!s} changed)' if (new 
or updated) else ''}"],
+        ]
+        for category, count, new, updated, components in _license_tally(items, 
prev)
+    ]
+    block.table(".table.table-sm.table-bordered.table-striped")[
+        htm.thead[htm.tr[htm.th["License Category"], htm.th["Count"]]],
+        htm.tbody[*warning_rows],
+    ]
 
 
 # TODO: Update this to return either a block or something we can use later in 
a block for styling reasons
@@ -457,6 +361,121 @@ def _license_tally(
     )
 
 
+def _load_license_issues(issues: list[str]) -> 
list[sbom.models.licenses.Issue]:
+    return [sbom.models.licenses.Issue.model_validate(json.loads(i)) for i in 
issues]
+
+
+def _missing_table(block: htm.Block, items: 
list[sbom.models.conformance.Missing]) -> None:
+    warning_rows = [
+        htm.tr[
+            htm.td[
+                kind.upper()
+                if (len(components) == 0)
+                else htm.details[htm.summary[kind.upper()], 
htm.div[_detail_table(components)]]
+            ],
+            htm.td[prop],
+            htm.td[str(count)],
+        ]
+        for kind, prop, count, components in _missing_tally(items)
+    ]
+    block.table(".table.table-sm.table-bordered.table-striped")[
+        htm.thead[htm.tr[htm.th["Kind"], htm.th["Property"], htm.th["Count"]]],
+        htm.tbody[*warning_rows],
+    ]
+
+
+def _missing_tally(items: list[sbom.models.conformance.Missing]) -> 
list[tuple[str, str, int, list[str | None]]]:
+    counts: dict[tuple[str, str], int] = {}
+    components: dict[tuple[str, str], list[str | None]] = {}
+    for item in items:
+        key = (getattr(item, "kind", ""), getattr(getattr(item, "property", 
None), "name", ""))
+        counts[key] = counts.get(key, 0) + 1
+        if key not in components:
+            components[key] = [str(item)]
+        elif item.kind == "missing_component_property":
+            components[key].append(str(item))
+    return sorted(
+        [(item, prop, count, components.get((item, prop), [])) for (item, 
prop), count in counts.items()],
+        key=lambda kv: (kv[0], kv[1]),
+    )
+
+
+def _outdated_tool_section(block: htm.Block, task_result: 
results.SBOMToolScore):
+    block.h2["Outdated tools"]
+    if task_result.outdated:
+        outdated = []
+        if isinstance(task_result.outdated, str):
+            # Older version, only checked one tool
+            outdated = 
[sbom.models.tool.OutdatedAdapter.validate_python(json.loads(task_result.outdated))]
+        elif isinstance(task_result.outdated, list):
+            # Newer version, checked multiple tools
+            outdated = 
[sbom.models.tool.OutdatedAdapter.validate_python(json.loads(o)) for o in 
task_result.outdated]
+        if len(outdated) == 0:
+            block.p["No outdated tools found."]
+        for result in outdated:
+            if result.kind == "tool":
+                if "Apache Trusted Releases" in result.name:
+                    block.p[
+                        f"""The last version of ATR used on this SBOM was
+                            {result.used_version} but ATR is currently version
+                            {result.available_version}."""
+                    ]
+                else:
+                    block.p[
+                        f"""The {result.name} is outdated. The used version is
+                            {result.used_version} and the available version is
+                            {result.available_version}."""
+                    ]
+            else:
+                if (result.kind == "missing_metadata") or (result.kind == 
"missing_timestamp"):
+                    # These both return without checking any further tools as 
they prevent checking
+                    block.p[
+                        f"""There was a problem with the SBOM detected when 
trying to
+                            determine if any tools were outdated:
+                            {result.kind.upper()}."""
+                    ]
+                else:
+                    block.p[
+                        f"""There was a problem with the SBOM detected when 
trying to
+                            determine if the {result.name} is outdated:
+                            {result.kind.upper()}."""
+                    ]
+    else:
+        block.p["No outdated tools found."]
+
+
+def _report_header(
+    block: htm.Block, is_release_candidate: bool, release: sql.Release, 
task_result: results.SBOMToolScore
+) -> None:
+    block.p[
+        """This is a report by the ATR SBOM tool, for debugging and
+        informational purposes. Please use it only as an approximate
+        guideline to the quality of your SBOM file."""
+    ]
+    if not is_release_candidate:
+        block.p[
+            "This report is for revision ", 
htm.code[task_result.revision_number], "."
+        ]  # TODO: Mark if a subsequent score has failed
+    elif release.phase == sql.ReleasePhase.RELEASE_CANDIDATE:
+        block.p[f"This report is for the latest {release.version} release 
candidate."]
+
+
+async def _report_task_results(block: htm.Block, task: sql.Task | None):
+    if task is None:
+        block.p["No SBOM score found."]
+        return await template.blank("SBOM report", content=block.collect())
+
+    task_status = task.status
+    task_error = task.error
+    if (task_status == sql.TaskStatus.QUEUED) or (task_status == 
sql.TaskStatus.ACTIVE):
+        block.p["SBOM score is being computed."]
+        return await template.blank("SBOM report", content=block.collect())
+
+    if task_status == sql.TaskStatus.FAILED:
+        block.p[f"SBOM score task failed: {task_error}"]
+        return await template.blank("SBOM report", content=block.collect())
+
+
 def _severity_to_style(severity: str) -> str:
     match severity.lower():
         case "critical":
@@ -474,6 +493,15 @@ def _severity_to_style(severity: str) -> str:
     return ".bg-info.text-light"
 
 
+def _update_worst_severity(severities: list[str], vuln_severity: str, worst: 
int) -> int:
+    try:
+        sev_index = severities.index(vuln_severity)
+    except ValueError:
+        sev_index = 99
+    worst = min(worst, sev_index)
+    return worst
+
+
 def _vulnerability_component_details_osv(
     block: htm.Block,
     component: results.OSVComponent,
@@ -546,64 +574,6 @@ def _vulnerability_component_details_osv(
     return new
 
 
-def _update_worst_severity(severities: list[str], vuln_severity: str, worst: 
int) -> int:
-    try:
-        sev_index = severities.index(vuln_severity)
-    except ValueError:
-        sev_index = 99
-    worst = min(worst, sev_index)
-    return worst
-
-
-def _vulnerability_scan_button(block: htm.Block) -> None:
-    block.p["You can perform a new vulnerability scan."]
-
-    form.render_block(
-        block,
-        model_cls=shared.sbom.ScanSBOMForm,
-        submit_label="Scan file",
-        empty=True,
-    )
-
-
-def _vulnerability_scan_find_completed_task(osv_tasks: Sequence[sql.Task], 
revision_number: str) -> sql.Task | None:
-    """Find the most recent completed OSV scan task for the given revision."""
-    for task in osv_tasks:
-        if (task.status == sql.TaskStatus.COMPLETED) and (task.result is not 
None):
-            task_result = task.result
-            if isinstance(task_result, results.SBOMOSVScan) and 
(task_result.revision_number == revision_number):
-                return task
-    return None
-
-
-def _vulnerability_scan_find_in_progress_task(osv_tasks: Sequence[sql.Task], 
revision_number: str) -> sql.Task | None:
-    """Find the most recent in-progress OSV scan task for the given 
revision."""
-    for task in osv_tasks:
-        if task.revision_number == revision_number:
-            if task.status in (sql.TaskStatus.QUEUED, sql.TaskStatus.ACTIVE, 
sql.TaskStatus.FAILED):
-                return task
-    return None
-
-
-def _vulnerability_scan_results(
-    block: htm.Block,
-    vulns: list[results.CdxVulnerabilityDetail],
-    scans: list[str],
-    task: sql.Task | None,
-    prev: list[results.CdxVulnerabilityDetail] | None,
-) -> None:
-    previous_vulns = None
-    if prev is not None:
-        previous_osv = [
-            (_cdx_to_osv(v), [a.get("ref", "") for a in v.affects] if 
(v.affects is not None) else []) for v in prev
-        ]
-        previous_vulns = {v.id: (_extract_vulnerability_severity(v), a) for v, 
a in previous_osv}
-    if task is not None:
-        _vulnerability_results_from_scan(task, block, previous_vulns)
-    else:
-        _vulnerability_results_from_bom(vulns, block, scans, previous_vulns)
-
-
 def _vulnerability_results_from_bom(
     vulns: list[results.CdxVulnerabilityDetail],
     block: htm.Block,
@@ -676,25 +646,55 @@ def _vulnerability_results_from_scan(
     block.append(new_block)
 
 
-def _cdx_to_osv(cdx: results.CdxVulnerabilityDetail) -> 
results.VulnerabilityDetails:
-    score = []
-    severity = ""
-    if cdx.ratings is not None:
-        severity, score = sbom.utilities.cdx_severity_to_osv(cdx.ratings)
-    return results.VulnerabilityDetails(
-        id=cdx.id,
-        summary=cdx.description,
-        details=cdx.detail,
-        modified=cdx.updated or "",
-        published=cdx.published,
-        severity=score,
-        database_specific={"severity": severity},
-        references=[{"type": "WEB", "url": a.get("url", "")} for a in 
cdx.advisories]
-        if (cdx.advisories is not None)
-        else [],
+def _vulnerability_scan_button(block: htm.Block) -> None:
+    block.p["You can perform a new vulnerability scan."]
+
+    form.render_block(
+        block,
+        model_cls=shared.sbom.ScanSBOMForm,
+        submit_label="Scan file",
+        empty=True,
     )
 
 
+def _vulnerability_scan_find_completed_task(osv_tasks: Sequence[sql.Task], 
revision_number: str) -> sql.Task | None:
+    """Find the most recent completed OSV scan task for the given revision."""
+    for task in osv_tasks:
+        if (task.status == sql.TaskStatus.COMPLETED) and (task.result is not 
None):
+            task_result = task.result
+            if isinstance(task_result, results.SBOMOSVScan) and 
(task_result.revision_number == revision_number):
+                return task
+    return None
+
+
+def _vulnerability_scan_find_in_progress_task(osv_tasks: Sequence[sql.Task], 
revision_number: str) -> sql.Task | None:
+    """Find the most recent in-progress OSV scan task for the given 
revision."""
+    for task in osv_tasks:
+        if task.revision_number == revision_number:
+            if task.status in (sql.TaskStatus.QUEUED, sql.TaskStatus.ACTIVE, 
sql.TaskStatus.FAILED):
+                return task
+    return None
+
+
+def _vulnerability_scan_results(
+    block: htm.Block,
+    vulns: list[results.CdxVulnerabilityDetail],
+    scans: list[str],
+    task: sql.Task | None,
+    prev: list[results.CdxVulnerabilityDetail] | None,
+) -> None:
+    previous_vulns = None
+    if prev is not None:
+        previous_osv = [
+            (_cdx_to_osv(v), [a.get("ref", "") for a in v.affects] if 
(v.affects is not None) else []) for v in prev
+        ]
+        previous_vulns = {v.id: (_extract_vulnerability_severity(v), a) for v, 
a in previous_osv}
+    if task is not None:
+        _vulnerability_results_from_scan(task, block, previous_vulns)
+    else:
+        _vulnerability_results_from_bom(vulns, block, scans, previous_vulns)
+
+
 def _vulnerability_scan_section(
     block: htm.Block,
     project: str,
diff --git a/atr/shared/distribution.py b/atr/shared/distribution.py
index 6e146c2..023bed3 100644
--- a/atr/shared/distribution.py
+++ b/atr/shared/distribution.py
@@ -32,6 +32,38 @@ import atr.models.sql as sql
 import atr.util as util
 from atr.storage import outcome
 
+# async def __json_from_maven_cdn(
+#     self, api_url: str, group_id: str, artifact_id: str, version: str
+# ) -> outcome.Outcome[models.basic.JSON]:
+#     import datetime
+#
+#     try:
+#         async with util.create_secure_session() as session:
+#             async with session.get(api_url) as response:
+#                 response.raise_for_status()
+#
+#         # Use current time as timestamp since we're just validating the 
package exists
+#         timestamp_ms = int(datetime.datetime.now(datetime.UTC).timestamp() * 
1000)
+#
+#         # Convert to dict matching MavenResponse structure
+#         result_dict = {
+#             "response": {
+#                 "start": 0,
+#                 "docs": [
+#                     {
+#                         "g": group_id,
+#                         "a": artifact_id,
+#                         "v": version,
+#                         "timestamp": timestamp_ms,
+#                     }
+#                 ],
+#             }
+#         }
+#         result = models.basic.as_json(result_dict)
+#         return outcome.Result(result)
+#     except aiohttp.ClientError as e:
+#         return outcome.Error(e)
+
 
 class DistributionError(RuntimeError): ...
 
@@ -121,173 +153,6 @@ class DistributeForm(form.Form):
         return self
 
 
-def html_submitted_values_table(block: htm.Block, dd: distribution.Data) -> 
None:
-    tbody = htm.tbody[
-        html_tr("Platform", dd.platform.name),
-        html_tr("Owner or Namespace", dd.owner_namespace or "-"),
-        html_tr("Package", dd.package),
-        html_tr("Version", dd.version),
-    ]
-    block.table(".table.table-striped.table-bordered")[tbody]
-
-
-def html_tr(label: str, value: str) -> htm.Element:
-    return htm.tr[htm.th[label], htm.td[value]]
-
-
-def html_tr_a(label: str, value: str | None) -> htm.Element:
-    return htm.tr[htm.th[label], htm.td[htm.a(href=value)[value] if value else 
"-"]]
-
-
-async def json_from_distribution_platform(
-    api_url: str, platform: sql.DistributionPlatform, version: str
-) -> outcome.Outcome[basic.JSON]:
-    try:
-        async with util.create_secure_session() as session:
-            async with session.get(api_url) as response:
-                response.raise_for_status()
-                response_json = await response.json()
-        result = basic.as_json(response_json)
-    except aiohttp.ClientError as e:
-        return outcome.Error(e)
-    match platform:
-        case sql.DistributionPlatform.NPM | 
sql.DistributionPlatform.NPM_SCOPED:
-            if version not in 
distribution.NpmResponse.model_validate(result).time:
-                e = DistributionError(f"Version '{version}' not found")
-                return outcome.Error(e)
-    return outcome.Result(result)
-
-
-async def release_validated(
-    project: str, version: str, committee: bool = False, staging: bool | None 
= None, release_policy: bool = False
-) -> sql.Release:
-    match staging:
-        case True:
-            phase = {sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT}
-        case False:
-            phase = {sql.ReleasePhase.RELEASE_PREVIEW}
-        case None:
-            phase = {sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT, 
sql.ReleasePhase.RELEASE_PREVIEW}
-    async with db.session() as data:
-        release = await data.release(
-            project_name=project,
-            version=version,
-            _committee=committee,
-            _release_policy=release_policy,
-        ).demand(RuntimeError(f"Release {project} {version} not found"))
-        if release.phase not in phase:
-            raise RuntimeError(f"Release {project} {version} is not in 
{phase}")
-        # if release.project.status != sql.ProjectStatus.ACTIVE:
-        #     raise RuntimeError(f"Project {project} is not active")
-    return release
-
-
-async def release_validated_and_committee(
-    project: str, version: str, *, staging: bool | None = None, 
release_policy: bool = False
-) -> tuple[sql.Release, sql.Committee]:
-    release = await release_validated(project, version, committee=True, 
staging=staging, release_policy=release_policy)
-    committee = release.committee
-    if committee is None:
-        raise RuntimeError(f"Release {project} {version} has no committee")
-    return release, committee
-
-
-# async def __json_from_maven_cdn(
-#     self, api_url: str, group_id: str, artifact_id: str, version: str
-# ) -> outcome.Outcome[models.basic.JSON]:
-#     import datetime
-#
-#     try:
-#         async with util.create_secure_session() as session:
-#             async with session.get(api_url) as response:
-#                 response.raise_for_status()
-#
-#         # Use current time as timestamp since we're just validating the 
package exists
-#         timestamp_ms = int(datetime.datetime.now(datetime.UTC).timestamp() * 
1000)
-#
-#         # Convert to dict matching MavenResponse structure
-#         result_dict = {
-#             "response": {
-#                 "start": 0,
-#                 "docs": [
-#                     {
-#                         "g": group_id,
-#                         "a": artifact_id,
-#                         "v": version,
-#                         "timestamp": timestamp_ms,
-#                     }
-#                 ],
-#             }
-#         }
-#         result = models.basic.as_json(result_dict)
-#         return outcome.Result(result)
-#     except aiohttp.ClientError as e:
-#         return outcome.Error(e)
-
-
-async def json_from_maven_xml(api_url: str, version: str) -> 
outcome.Outcome[basic.JSON]:
-    import datetime
-    import xml.etree.ElementTree as ET
-
-    try:
-        async with util.create_secure_session() as session:
-            async with session.get(api_url) as response:
-                response.raise_for_status()
-                xml_text = await response.text()
-
-        # Parse the XML
-        root = ET.fromstring(xml_text)
-
-        # Extract versioning info
-        group = root.find("groupId")
-        artifact = root.find("artifactId")
-        versioning = root.find("versioning")
-        if versioning is None:
-            e = DistributionError("No versioning element found in Maven 
metadata")
-            return outcome.Error(e)
-
-        # Get lastUpdated timestamp (format: yyyyMMddHHmmss)
-        last_updated_elem = versioning.find("lastUpdated")
-        if (last_updated_elem is None) or (not last_updated_elem.text):
-            e = DistributionError("No lastUpdated timestamp found in Maven 
metadata")
-            return outcome.Error(e)
-
-        # Convert lastUpdated string to Unix timestamp in milliseconds
-        last_updated_str = last_updated_elem.text
-        dt = datetime.datetime.strptime(last_updated_str, "%Y%m%d%H%M%S")
-        dt = dt.replace(tzinfo=datetime.UTC)
-        timestamp_ms = int(dt.timestamp() * 1000)
-
-        # Verify the version exists
-        versions_elem = versioning.find("versions")
-        if versions_elem is not None:
-            versions = [v.text for v in versions_elem.findall("version") if 
v.text]
-            if version not in versions:
-                e = DistributionError(f"Version '{version}' not found in Maven 
metadata")
-                return outcome.Error(e)
-
-        # Convert to dict matching MavenResponse structure
-        result_dict = {
-            "response": {
-                "start": 0,
-                "docs": [
-                    {
-                        "g": group.text if (group is not None) else "",
-                        "a": artifact.text if (artifact is not None) else "",
-                        "v": version,
-                        "timestamp": timestamp_ms,
-                    }
-                ],
-            }
-        }
-        result = basic.as_json(result_dict)
-        return outcome.Result(result)
-    except (aiohttp.ClientError, DistributionError) as e:
-        return outcome.Error(e)
-    except ET.ParseError as e:
-        return outcome.Error(RuntimeError(f"Failed to parse Maven XML: {e}"))
-
-
 def distribution_upload_date(  # noqa: C901
     platform: sql.DistributionPlatform,
     data: basic.JSON,
@@ -393,6 +258,140 @@ def get_api_url(dd: distribution.Data, staging: bool | 
None = None):
     return api_url
 
 
+def html_submitted_values_table(block: htm.Block, dd: distribution.Data) -> 
None:
+    tbody = htm.tbody[
+        html_tr("Platform", dd.platform.name),
+        html_tr("Owner or Namespace", dd.owner_namespace or "-"),
+        html_tr("Package", dd.package),
+        html_tr("Version", dd.version),
+    ]
+    block.table(".table.table-striped.table-bordered")[tbody]
+
+
+def html_tr(label: str, value: str) -> htm.Element:
+    return htm.tr[htm.th[label], htm.td[value]]
+
+
+def html_tr_a(label: str, value: str | None) -> htm.Element:
+    return htm.tr[htm.th[label], htm.td[htm.a(href=value)[value] if value else 
"-"]]
+
+
+async def json_from_distribution_platform(
+    api_url: str, platform: sql.DistributionPlatform, version: str
+) -> outcome.Outcome[basic.JSON]:
+    try:
+        async with util.create_secure_session() as session:
+            async with session.get(api_url) as response:
+                response.raise_for_status()
+                response_json = await response.json()
+        result = basic.as_json(response_json)
+    except aiohttp.ClientError as e:
+        return outcome.Error(e)
+    match platform:
+        case sql.DistributionPlatform.NPM | 
sql.DistributionPlatform.NPM_SCOPED:
+            if version not in 
distribution.NpmResponse.model_validate(result).time:
+                e = DistributionError(f"Version '{version}' not found")
+                return outcome.Error(e)
+    return outcome.Result(result)
+
+
+async def json_from_maven_xml(api_url: str, version: str) -> 
outcome.Outcome[basic.JSON]:
+    import datetime
+    import xml.etree.ElementTree as ET
+
+    try:
+        async with util.create_secure_session() as session:
+            async with session.get(api_url) as response:
+                response.raise_for_status()
+                xml_text = await response.text()
+
+        # Parse the XML
+        root = ET.fromstring(xml_text)
+
+        # Extract versioning info
+        group = root.find("groupId")
+        artifact = root.find("artifactId")
+        versioning = root.find("versioning")
+        if versioning is None:
+            e = DistributionError("No versioning element found in Maven 
metadata")
+            return outcome.Error(e)
+
+        # Get lastUpdated timestamp (format: yyyyMMddHHmmss)
+        last_updated_elem = versioning.find("lastUpdated")
+        if (last_updated_elem is None) or (not last_updated_elem.text):
+            e = DistributionError("No lastUpdated timestamp found in Maven 
metadata")
+            return outcome.Error(e)
+
+        # Convert lastUpdated string to Unix timestamp in milliseconds
+        last_updated_str = last_updated_elem.text
+        dt = datetime.datetime.strptime(last_updated_str, "%Y%m%d%H%M%S")
+        dt = dt.replace(tzinfo=datetime.UTC)
+        timestamp_ms = int(dt.timestamp() * 1000)
+
+        # Verify the version exists
+        versions_elem = versioning.find("versions")
+        if versions_elem is not None:
+            versions = [v.text for v in versions_elem.findall("version") if 
v.text]
+            if version not in versions:
+                e = DistributionError(f"Version '{version}' not found in Maven 
metadata")
+                return outcome.Error(e)
+
+        # Convert to dict matching MavenResponse structure
+        result_dict = {
+            "response": {
+                "start": 0,
+                "docs": [
+                    {
+                        "g": group.text if (group is not None) else "",
+                        "a": artifact.text if (artifact is not None) else "",
+                        "v": version,
+                        "timestamp": timestamp_ms,
+                    }
+                ],
+            }
+        }
+        result = basic.as_json(result_dict)
+        return outcome.Result(result)
+    except (aiohttp.ClientError, DistributionError) as e:
+        return outcome.Error(e)
+    except ET.ParseError as e:
+        return outcome.Error(RuntimeError(f"Failed to parse Maven XML: {e}"))
+
+
+async def release_validated(
+    project: str, version: str, committee: bool = False, staging: bool | None 
= None, release_policy: bool = False
+) -> sql.Release:
+    match staging:
+        case True:
+            phase = {sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT}
+        case False:
+            phase = {sql.ReleasePhase.RELEASE_PREVIEW}
+        case None:
+            phase = {sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT, 
sql.ReleasePhase.RELEASE_PREVIEW}
+    async with db.session() as data:
+        release = await data.release(
+            project_name=project,
+            version=version,
+            _committee=committee,
+            _release_policy=release_policy,
+        ).demand(RuntimeError(f"Release {project} {version} not found"))
+        if release.phase not in phase:
+            raise RuntimeError(f"Release {project} {version} is not in 
{phase}")
+        # if release.project.status != sql.ProjectStatus.ACTIVE:
+        #     raise RuntimeError(f"Project {project} is not active")
+    return release
+
+
+async def release_validated_and_committee(
+    project: str, version: str, *, staging: bool | None = None, 
release_policy: bool = False
+) -> tuple[sql.Release, sql.Committee]:
+    release = await release_validated(project, version, committee=True, 
staging=staging, release_policy=release_policy)
+    committee = release.committee
+    if committee is None:
+        raise RuntimeError(f"Release {project} {version} has no committee")
+    return release, committee
+
+
 def _template_url(
     dd: distribution.Data,
     staging: bool | None = None,
diff --git a/atr/shared/ignores.py b/atr/shared/ignores.py
index 347a133..2b41f66 100644
--- a/atr/shared/ignores.py
+++ b/atr/shared/ignores.py
@@ -40,32 +40,6 @@ class IgnoreStatus(enum.Enum):
     WARNING = "Warning"
 
 
-def ignore_status_to_sql(status: IgnoreStatus | None) -> 
sql.CheckResultStatusIgnore | None:
-    """Convert wrapper enum to SQL enum."""
-    if (status is None) or (status == IgnoreStatus.NO_STATUS):
-        return None
-    match status:
-        case IgnoreStatus.EXCEPTION:
-            return sql.CheckResultStatusIgnore.EXCEPTION
-        case IgnoreStatus.FAILURE:
-            return sql.CheckResultStatusIgnore.FAILURE
-        case IgnoreStatus.WARNING:
-            return sql.CheckResultStatusIgnore.WARNING
-
-
-def sql_to_ignore_status(status: sql.CheckResultStatusIgnore | None) -> 
IgnoreStatus:
-    """Convert SQL enum to wrapper enum."""
-    if status is None:
-        return IgnoreStatus.NO_STATUS
-    match status:
-        case sql.CheckResultStatusIgnore.EXCEPTION:
-            return IgnoreStatus.EXCEPTION
-        case sql.CheckResultStatusIgnore.FAILURE:
-            return IgnoreStatus.FAILURE
-        case sql.CheckResultStatusIgnore.WARNING:
-            return IgnoreStatus.WARNING
-
-
 class AddIgnoreForm(form.Form):
     variant: ADD = form.value(ADD)
     release_glob: str = form.label("Release pattern", default="")
@@ -154,6 +128,32 @@ type IgnoreForm = Annotated[
 ]
 
 
+def ignore_status_to_sql(status: IgnoreStatus | None) -> 
sql.CheckResultStatusIgnore | None:
+    """Convert wrapper enum to SQL enum."""
+    if (status is None) or (status == IgnoreStatus.NO_STATUS):
+        return None
+    match status:
+        case IgnoreStatus.EXCEPTION:
+            return sql.CheckResultStatusIgnore.EXCEPTION
+        case IgnoreStatus.FAILURE:
+            return sql.CheckResultStatusIgnore.FAILURE
+        case IgnoreStatus.WARNING:
+            return sql.CheckResultStatusIgnore.WARNING
+
+
+def sql_to_ignore_status(status: sql.CheckResultStatusIgnore | None) -> 
IgnoreStatus:
+    """Convert SQL enum to wrapper enum."""
+    if status is None:
+        return IgnoreStatus.NO_STATUS
+    match status:
+        case sql.CheckResultStatusIgnore.EXCEPTION:
+            return IgnoreStatus.EXCEPTION
+        case sql.CheckResultStatusIgnore.FAILURE:
+            return IgnoreStatus.FAILURE
+        case sql.CheckResultStatusIgnore.WARNING:
+            return IgnoreStatus.WARNING
+
+
 def _validate_ignore_form_patterns(*patterns: str) -> None:
     for pattern in patterns:
         if not pattern:
diff --git a/atr/util.py b/atr/util.py
index f573995..8d19a6b 100644
--- a/atr/util.py
+++ b/atr/util.py
@@ -279,7 +279,7 @@ def cookie_pmcs_or_session_pmcs(session_data: 
session.ClientSession) -> list[str
     pmcs = cookie_pmcs()
     if pmcs is None:
         pmcs = session_data.committees
-    if not isinstance(pmcs, list) or (not (all(isinstance(item, str) for item 
in pmcs))):
+    if (not isinstance(pmcs, list)) or (not (all(isinstance(item, str) for 
item in pmcs))):
         raise TypeError("Session pmcs must be a list[str]")
     return pmcs
 
diff --git a/tests/unit/test_cache.py b/tests/unit/test_cache.py
index 9990c8b..a165778 100644
--- a/tests/unit/test_cache.py
+++ b/tests/unit/test_cache.py
@@ -29,26 +29,26 @@ if TYPE_CHECKING:
     from pytest import MonkeyPatch
 
 
-class _MockApp:
+class MockApp:
     def __init__(self):
         self.extensions: dict[str, object] = {}
 
 
-class _MockConfig:
+class MockConfig:
     def __init__(self, state_dir: pathlib.Path):
         self.STATE_DIR = str(state_dir)
 
 
 @pytest.fixture
-def mock_app(monkeypatch: "MonkeyPatch") -> _MockApp:
-    app = _MockApp()
+def mock_app(monkeypatch: "MonkeyPatch") -> MockApp:
+    app = MockApp()
     monkeypatch.setattr("asfquart.APP", app)
     return app
 
 
 @pytest.fixture
 def state_dir(tmp_path: pathlib.Path, monkeypatch: "MonkeyPatch") -> 
pathlib.Path:
-    monkeypatch.setattr("atr.config.get", lambda: _MockConfig(tmp_path))
+    monkeypatch.setattr("atr.config.get", lambda: MockConfig(tmp_path))
     return tmp_path
 
 
@@ -109,18 +109,18 @@ async def 
test_admins_get_async_falls_back_to_file_without_app_context(
 
 
 @pytest.mark.asyncio
-async def test_admins_get_async_uses_extensions_when_available(mock_app: 
_MockApp):
+async def test_admins_get_async_uses_extensions_when_available(mock_app: 
MockApp):
     mock_app.extensions["admins"] = frozenset({"async_alice"})
     result = await cache.admins_get_async()
     assert result == frozenset({"async_alice"})
 
 
-def test_admins_get_returns_empty_frozenset_when_not_set(mock_app: _MockApp):
+def test_admins_get_returns_empty_frozenset_when_not_set(mock_app: MockApp):
     result = cache.admins_get()
     assert result == frozenset()
 
 
-def test_admins_get_returns_frozenset_from_extensions(mock_app: _MockApp):
+def test_admins_get_returns_frozenset_from_extensions(mock_app: MockApp):
     mock_app.extensions["admins"] = frozenset({"alice", "bob"})
     result = cache.admins_get()
     assert result == frozenset({"alice", "bob"})
@@ -178,7 +178,7 @@ async def 
test_admins_save_to_file_creates_parent_dirs(state_dir: pathlib.Path):
 
 @pytest.mark.asyncio
 async def test_admins_startup_load_calls_ldap_when_cache_missing(
-    state_dir: pathlib.Path, mock_app: _MockApp, monkeypatch: "MonkeyPatch"
+    state_dir: pathlib.Path, mock_app: MockApp, monkeypatch: "MonkeyPatch"
 ):
     ldap_called = False
 
@@ -199,7 +199,7 @@ async def 
test_admins_startup_load_calls_ldap_when_cache_missing(
 
 @pytest.mark.asyncio
 async def test_admins_startup_load_handles_ldap_failure(
-    state_dir: pathlib.Path, mock_app: _MockApp, monkeypatch: "MonkeyPatch"
+    state_dir: pathlib.Path, mock_app: MockApp, monkeypatch: "MonkeyPatch"
 ):
     async def mock_fetch_admin_users() -> frozenset[str]:
         raise ConnectionError("LDAP server unavailable")
@@ -215,7 +215,7 @@ async def test_admins_startup_load_handles_ldap_failure(
 
 @pytest.mark.asyncio
 async def test_admins_startup_load_uses_cache_when_present(
-    state_dir: pathlib.Path, mock_app: _MockApp, monkeypatch: "MonkeyPatch"
+    state_dir: pathlib.Path, mock_app: MockApp, monkeypatch: "MonkeyPatch"
 ):
     ldap_called = False
 
diff --git a/tests/unit/test_ldap.py b/tests/unit/test_ldap.py
index 43fdf8a..15f8844 100644
--- a/tests/unit/test_ldap.py
+++ b/tests/unit/test_ldap.py
@@ -27,12 +27,12 @@ if TYPE_CHECKING:
     from pytest import MonkeyPatch
 
 
-class _MockApp:
+class MockApp:
     def __init__(self):
         self.extensions: dict[str, object] = {}
 
 
-class _MockConfig:
+class MockConfig:
     def __init__(self, state_dir: pathlib.Path, ldap_bind_dn: str | None, 
ldap_bind_password: str | None):
         self.STATE_DIR = str(state_dir)
         self.LDAP_BIND_DN = ldap_bind_dn
@@ -53,10 +53,10 @@ async def test_admins_startup_load_fetches_real_admins(
     import atr.config as config
 
     real_config = config.get()
-    mock_config = _MockConfig(tmp_path, real_config.LDAP_BIND_DN, 
real_config.LDAP_BIND_PASSWORD)
+    mock_config = MockConfig(tmp_path, real_config.LDAP_BIND_DN, 
real_config.LDAP_BIND_PASSWORD)
     monkeypatch.setattr("atr.config.get", lambda: mock_config)
 
-    mock_app = _MockApp()
+    mock_app = MockApp()
     monkeypatch.setattr("asfquart.APP", mock_app)
 
     await cache.admins_startup_load()
diff --git a/tests/unit/test_mail.py b/tests/unit/test_mail.py
index 2f5f5e6..39a8339 100644
--- a/tests/unit/test_mail.py
+++ b/tests/unit/test_mail.py
@@ -31,107 +31,31 @@ if TYPE_CHECKING:
 
 
 @pytest.mark.asyncio
-async def test_send_rejects_crlf_in_subject(monkeypatch: "MonkeyPatch") -> 
None:
-    """Test that CRLF injection in subject field is rejected."""
-    # Mock _send_many to ensure we never actually send emails
+async def test_address_objects_used_for_from_to_headers(monkeypatch: 
"MonkeyPatch") -> None:
+    """Test that Address objects are used for From/To headers."""
     mock_send_many = AsyncMock(return_value=[])
     monkeypatch.setattr("atr.mail._send_many", mock_send_many)
 
-    # Create a malicious message with CRLF in the subject
-    malicious_message = mail.Message(
+    legitimate_message = mail.Message(
         email_sender="[email protected]",
         email_recipient="[email protected]",
-        subject="Legitimate Subject\r\nBcc: [email protected]",
-        body="This is a test message",
-    )
-
-    # Call send and expect it to catch the injection
-    _, errors = await mail.send(malicious_message)
-
-    # Assert that the function returned an error
-    assert len(errors) == 1
-    assert "CRLF injection detected" in errors[0]
-
-    # Assert that _send_many was never called (email was not sent)
-    mock_send_many.assert_not_called()
-
-
[email protected]
-async def test_send_rejects_crlf_in_from_address(monkeypatch: "MonkeyPatch") 
-> None:
-    """Test that CRLF injection in from address field is rejected.
-
-    Note: The from_addr validation happens before EmailMessage processing,
-    so this test verifies the early validation layer also protects against 
injection.
-    """
-    mock_send_many = AsyncMock(return_value=[])
-    monkeypatch.setattr("atr.mail._send_many", mock_send_many)
-
-    # Create a malicious message with CRLF in the from address
-    malicious_message = mail.Message(
-        email_sender="[email protected]\r\nBcc: [email protected]",
-        email_recipient="[email protected]",
         subject="Test Subject",
-        body="This is a test message",
-    )
-
-    # Call send and expect it to raise ValueError due to invalid from_addr 
format
-    with pytest.raises(ValueError, match=r"from_addr must end with 
@apache.org"):
-        await mail.send(malicious_message)
-
-    # Assert that _send_many was never called
-    mock_send_many.assert_not_called()
-
-
[email protected]
-async def test_send_rejects_crlf_in_to_address(monkeypatch: "MonkeyPatch") -> 
None:
-    """Test that CRLF injection in to address field is rejected.
-
-    Note: The _validate_recipient check happens before EmailMessage processing,
-    so this test verifies the early validation layer also protects against 
injection.
-    """
-    mock_send_many = AsyncMock(return_value=[])
-    monkeypatch.setattr("atr.mail._send_many", mock_send_many)
-
-    # Create a malicious message with CRLF in the to address
-    malicious_message = mail.Message(
-        email_sender="[email protected]",
-        email_recipient="[email protected]\r\nBcc: [email protected]",
-        subject="Test Subject",
-        body="This is a test message",
+        body="Test body",
     )
 
-    # Call send and expect it to raise ValueError due to invalid recipient 
format
-    with pytest.raises(ValueError, match=r"Email recipient must be 
@apache.org"):
-        await mail.send(malicious_message)
-
-    # Assert that _send_many was never called
-    mock_send_many.assert_not_called()
-
-
[email protected]
-async def test_send_rejects_crlf_in_reply_to(monkeypatch: "MonkeyPatch") -> 
None:
-    """Test that CRLF injection in in_reply_to field is rejected."""
-    mock_send_many = AsyncMock(return_value=[])
-    monkeypatch.setattr("atr.mail._send_many", mock_send_many)
-
-    # Create a malicious message with CRLF in the in_reply_to field
-    malicious_message = mail.Message(
-        email_sender="[email protected]",
-        email_recipient="[email protected]",
-        subject="Test Subject",
-        body="This is a test message",
-        in_reply_to="[email protected]\r\nBcc: [email protected]",
-    )
+    _, errors = await mail.send(legitimate_message)
 
-    # Call send and expect it to catch the injection
-    _, errors = await mail.send(malicious_message)
+    # Verify the message was sent successfully
+    assert len(errors) == 0
+    mock_send_many.assert_called_once()
 
-    # Assert that the function returned an error
-    assert len(errors) == 1
-    assert "CRLF injection detected" in errors[0]
+    # Verify the generated email bytes contain properly formatted addresses
+    call_args = mock_send_many.call_args
+    msg_text = call_args[0][2]  # already a str
 
-    # Assert that _send_many was never called
-    mock_send_many.assert_not_called()
+    # Address objects format email addresses properly
+    assert "From: [email protected]" in msg_text
+    assert "To: [email protected]" in msg_text
 
 
 @pytest.mark.asyncio
@@ -196,17 +120,72 @@ async def 
test_send_accepts_message_with_reply_to(monkeypatch: "MonkeyPatch") ->
 
 
 @pytest.mark.asyncio
-async def test_send_rejects_lf_only_injection(monkeypatch: "MonkeyPatch") -> 
None:
-    """Test that injection with LF only (\\n) is also rejected."""
+async def test_send_handles_non_ascii_headers(monkeypatch: "MonkeyPatch") -> 
None:
+    """Test that non-ASCII characters in headers are handled correctly."""
     mock_send_many = AsyncMock(return_value=[])
     monkeypatch.setattr("atr.mail._send_many", mock_send_many)
 
-    # Create a malicious message with just LF (no CR)
+    # Create a message with non-ASCII characters in the subject
+    message_with_unicode = mail.Message(
+        email_sender="[email protected]",
+        email_recipient="[email protected]",
+        subject="Test avec Accént",
+        body="This message has non-ASCII characters in the subject.",
+    )
+
+    # Call send
+    _mid, errors = await mail.send(message_with_unicode)
+
+    # Assert that no errors were returned
+    assert len(errors) == 0
+
+    # Assert that _send_many was called with a string (not bytes)
+    mock_send_many.assert_called_once()
+    call_args = mock_send_many.call_args
+    msg_text = call_args[0][2]  # Third argument should be str
+    assert isinstance(msg_text, str)
+
+    # Verify the subject is present in the message
+    assert "Subject: Test avec Accént" in msg_text
+
+
[email protected]
+async def test_send_rejects_bcc_header_injection(monkeypatch: "MonkeyPatch") 
-> None:
+    """Test a realistic Bcc header injection attack scenario."""
+    mock_send_many = AsyncMock(return_value=[])
+    monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+    # Create a malicious message attempting to inject a Bcc header
     malicious_message = mail.Message(
         email_sender="[email protected]",
         email_recipient="[email protected]",
-        subject="Legitimate Subject\nBcc: [email protected]",
-        body="This is a test message",
+        subject="Important Notice\r\nBcc: 
[email protected]\r\nX-Priority: 1",
+        body="This message attempts to secretly copy an attacker.",
+    )
+
+    # Call send and expect it to catch the injection
+    _, errors = await mail.send(malicious_message)
+
+    # Assert that the function returned an error
+    assert len(errors) == 1
+    assert "CRLF injection detected" in errors[0]
+
+    # Assert that _send_many was never called
+    mock_send_many.assert_not_called()
+
+
[email protected]
+async def test_send_rejects_content_type_injection(monkeypatch: "MonkeyPatch") 
-> None:
+    """Test injection attempting to override Content-Type header."""
+    mock_send_many = AsyncMock(return_value=[])
+    monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+    # Create a malicious message attempting to inject Content-Type
+    malicious_message = mail.Message(
+        email_sender="[email protected]",
+        email_recipient="[email protected]",
+        subject="Test\r\nContent-Type: 
text/html\r\n\r\n<html><script>alert('XSS')</script></html>",
+        body="Normal body",
     )
 
     # Call send and expect it to catch the injection
@@ -246,17 +225,44 @@ async def 
test_send_rejects_cr_only_injection(monkeypatch: "MonkeyPatch") -> Non
 
 
 @pytest.mark.asyncio
-async def test_send_rejects_bcc_header_injection(monkeypatch: "MonkeyPatch") 
-> None:
-    """Test a realistic Bcc header injection attack scenario."""
+async def test_send_rejects_crlf_in_from_address(monkeypatch: "MonkeyPatch") 
-> None:
+    """Test that CRLF injection in from address field is rejected.
+
+    Note: The from_addr validation happens before EmailMessage processing,
+    so this test verifies the early validation layer also protects against 
injection.
+    """
     mock_send_many = AsyncMock(return_value=[])
     monkeypatch.setattr("atr.mail._send_many", mock_send_many)
 
-    # Create a malicious message attempting to inject a Bcc header
+    # Create a malicious message with CRLF in the from address
+    malicious_message = mail.Message(
+        email_sender="[email protected]\r\nBcc: [email protected]",
+        email_recipient="[email protected]",
+        subject="Test Subject",
+        body="This is a test message",
+    )
+
+    # Call send and expect it to raise ValueError due to invalid from_addr 
format
+    with pytest.raises(ValueError, match=r"from_addr must end with 
@apache.org"):
+        await mail.send(malicious_message)
+
+    # Assert that _send_many was never called
+    mock_send_many.assert_not_called()
+
+
[email protected]
+async def test_send_rejects_crlf_in_reply_to(monkeypatch: "MonkeyPatch") -> 
None:
+    """Test that CRLF injection in in_reply_to field is rejected."""
+    mock_send_many = AsyncMock(return_value=[])
+    monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+    # Create a malicious message with CRLF in the in_reply_to field
     malicious_message = mail.Message(
         email_sender="[email protected]",
         email_recipient="[email protected]",
-        subject="Important Notice\r\nBcc: 
[email protected]\r\nX-Priority: 1",
-        body="This message attempts to secretly copy an attacker.",
+        subject="Test Subject",
+        body="This is a test message",
+        in_reply_to="[email protected]\r\nBcc: [email protected]",
     )
 
     # Call send and expect it to catch the injection
@@ -271,17 +277,18 @@ async def 
test_send_rejects_bcc_header_injection(monkeypatch: "MonkeyPatch") ->
 
 
 @pytest.mark.asyncio
-async def test_send_rejects_content_type_injection(monkeypatch: "MonkeyPatch") 
-> None:
-    """Test injection attempting to override Content-Type header."""
+async def test_send_rejects_crlf_in_subject(monkeypatch: "MonkeyPatch") -> 
None:
+    """Test that CRLF injection in subject field is rejected."""
+    # Mock _send_many to ensure we never actually send emails
     mock_send_many = AsyncMock(return_value=[])
     monkeypatch.setattr("atr.mail._send_many", mock_send_many)
 
-    # Create a malicious message attempting to inject Content-Type
+    # Create a malicious message with CRLF in the subject
     malicious_message = mail.Message(
         email_sender="[email protected]",
         email_recipient="[email protected]",
-        subject="Test\r\nContent-Type: 
text/html\r\n\r\n<html><script>alert('XSS')</script></html>",
-        body="Normal body",
+        subject="Legitimate Subject\r\nBcc: [email protected]",
+        body="This is a test message",
     )
 
     # Call send and expect it to catch the injection
@@ -291,66 +298,59 @@ async def 
test_send_rejects_content_type_injection(monkeypatch: "MonkeyPatch") -
     assert len(errors) == 1
     assert "CRLF injection detected" in errors[0]
 
-    # Assert that _send_many was never called
+    # Assert that _send_many was never called (email was not sent)
     mock_send_many.assert_not_called()
 
 
 @pytest.mark.asyncio
-async def test_address_objects_used_for_from_to_headers(monkeypatch: 
"MonkeyPatch") -> None:
-    """Test that Address objects are used for From/To headers."""
+async def test_send_rejects_crlf_in_to_address(monkeypatch: "MonkeyPatch") -> 
None:
+    """Test that CRLF injection in to address field is rejected.
+
+    Note: The _validate_recipient check happens before EmailMessage processing,
+    so this test verifies the early validation layer also protects against 
injection.
+    """
     mock_send_many = AsyncMock(return_value=[])
     monkeypatch.setattr("atr.mail._send_many", mock_send_many)
 
-    legitimate_message = mail.Message(
+    # Create a malicious message with CRLF in the to address
+    malicious_message = mail.Message(
         email_sender="[email protected]",
-        email_recipient="[email protected]",
+        email_recipient="[email protected]\r\nBcc: [email protected]",
         subject="Test Subject",
-        body="Test body",
+        body="This is a test message",
     )
 
-    _, errors = await mail.send(legitimate_message)
-
-    # Verify the message was sent successfully
-    assert len(errors) == 0
-    mock_send_many.assert_called_once()
-
-    # Verify the generated email bytes contain properly formatted addresses
-    call_args = mock_send_many.call_args
-    msg_text = call_args[0][2]  # already a str
+    # Call send and expect it to raise ValueError due to invalid recipient 
format
+    with pytest.raises(ValueError, match=r"Email recipient must be 
@apache.org"):
+        await mail.send(malicious_message)
 
-    # Address objects format email addresses properly
-    assert "From: [email protected]" in msg_text
-    assert "To: [email protected]" in msg_text
+    # Assert that _send_many was never called
+    mock_send_many.assert_not_called()
 
 
 @pytest.mark.asyncio
-async def test_send_handles_non_ascii_headers(monkeypatch: "MonkeyPatch") -> 
None:
-    """Test that non-ASCII characters in headers are handled correctly."""
+async def test_send_rejects_lf_only_injection(monkeypatch: "MonkeyPatch") -> 
None:
+    """Test that injection with LF only (\\n) is also rejected."""
     mock_send_many = AsyncMock(return_value=[])
     monkeypatch.setattr("atr.mail._send_many", mock_send_many)
 
-    # Create a message with non-ASCII characters in the subject
-    message_with_unicode = mail.Message(
+    # Create a malicious message with just LF (no CR)
+    malicious_message = mail.Message(
         email_sender="[email protected]",
         email_recipient="[email protected]",
-        subject="Test avec Accént",
-        body="This message has non-ASCII characters in the subject.",
+        subject="Legitimate Subject\nBcc: [email protected]",
+        body="This is a test message",
     )
 
-    # Call send
-    _mid, errors = await mail.send(message_with_unicode)
-
-    # Assert that no errors were returned
-    assert len(errors) == 0
+    # Call send and expect it to catch the injection
+    _, errors = await mail.send(malicious_message)
 
-    # Assert that _send_many was called with a string (not bytes)
-    mock_send_many.assert_called_once()
-    call_args = mock_send_many.call_args
-    msg_text = call_args[0][2]  # Third argument should be str
-    assert isinstance(msg_text, str)
+    # Assert that the function returned an error
+    assert len(errors) == 1
+    assert "CRLF injection detected" in errors[0]
 
-    # Verify the subject is present in the message
-    assert "Subject: Test avec Accént" in msg_text
+    # Assert that _send_many was never called
+    mock_send_many.assert_not_called()
 
 
 def test_smtp_policy_vs_smtputf8() -> None:
diff --git a/tests/unit/test_user.py b/tests/unit/test_user.py
index 268b22e..23c93ce 100644
--- a/tests/unit/test_user.py
+++ b/tests/unit/test_user.py
@@ -25,83 +25,83 @@ if TYPE_CHECKING:
     from pytest import MonkeyPatch
 
 
-class _MockApp:
+class MockApp:
     def __init__(self):
         self.extensions: dict[str, object] = {}
 
 
-class _MockConfig:
+class MockConfig:
     def __init__(self, allow_tests: bool = False, admin_users_additional: str 
= ""):
         self.ALLOW_TESTS = allow_tests
         self.ADMIN_USERS_ADDITIONAL = admin_users_additional
 
 
 @pytest.fixture
-def mock_app(monkeypatch: "MonkeyPatch") -> _MockApp:
-    app = _MockApp()
+def mock_app(monkeypatch: "MonkeyPatch") -> MockApp:
+    app = MockApp()
     monkeypatch.setattr("asfquart.APP", app)
     return app
 
 
 @pytest.mark.asyncio
-async def test_is_admin_async_returns_false_for_none(mock_app: _MockApp, 
monkeypatch: "MonkeyPatch"):
-    monkeypatch.setattr("atr.config.get", lambda: _MockConfig())
+async def test_is_admin_async_returns_false_for_none(mock_app: MockApp, 
monkeypatch: "MonkeyPatch"):
+    monkeypatch.setattr("atr.config.get", lambda: MockConfig())
     mock_app.extensions["admins"] = frozenset()
     assert await user.is_admin_async(None) is False
 
 
 @pytest.mark.asyncio
-async def test_is_admin_async_returns_true_for_cached_admin(mock_app: 
_MockApp, monkeypatch: "MonkeyPatch"):
+async def test_is_admin_async_returns_true_for_cached_admin(mock_app: MockApp, 
monkeypatch: "MonkeyPatch"):
     user._get_additional_admin_users.cache_clear()
-    monkeypatch.setattr("atr.config.get", lambda: _MockConfig())
+    monkeypatch.setattr("atr.config.get", lambda: MockConfig())
     mock_app.extensions["admins"] = frozenset({"async_admin"})
     assert await user.is_admin_async("async_admin") is True
 
 
 @pytest.mark.asyncio
-async def test_is_admin_async_returns_true_for_test_user(mock_app: _MockApp, 
monkeypatch: "MonkeyPatch"):
+async def test_is_admin_async_returns_true_for_test_user(mock_app: MockApp, 
monkeypatch: "MonkeyPatch"):
     user._get_additional_admin_users.cache_clear()
-    monkeypatch.setattr("atr.config.get", lambda: 
_MockConfig(allow_tests=True))
+    monkeypatch.setattr("atr.config.get", lambda: MockConfig(allow_tests=True))
     mock_app.extensions["admins"] = frozenset()
     assert await user.is_admin_async("test") is True
 
 
-def test_is_admin_returns_false_for_none(mock_app: _MockApp, monkeypatch: 
"MonkeyPatch"):
-    monkeypatch.setattr("atr.config.get", lambda: _MockConfig())
+def test_is_admin_returns_false_for_none(mock_app: MockApp, monkeypatch: 
"MonkeyPatch"):
+    monkeypatch.setattr("atr.config.get", lambda: MockConfig())
     mock_app.extensions["admins"] = frozenset()
     assert user.is_admin(None) is False
 
 
-def test_is_admin_returns_false_for_test_user_when_not_allowed(mock_app: 
_MockApp, monkeypatch: "MonkeyPatch"):
+def test_is_admin_returns_false_for_test_user_when_not_allowed(mock_app: 
MockApp, monkeypatch: "MonkeyPatch"):
     user._get_additional_admin_users.cache_clear()
-    monkeypatch.setattr("atr.config.get", lambda: 
_MockConfig(allow_tests=False))
+    monkeypatch.setattr("atr.config.get", lambda: 
MockConfig(allow_tests=False))
     mock_app.extensions["admins"] = frozenset()
     assert user.is_admin("test") is False
 
 
-def test_is_admin_returns_false_for_unknown_user(mock_app: _MockApp, 
monkeypatch: "MonkeyPatch"):
-    monkeypatch.setattr("atr.config.get", lambda: _MockConfig())
+def test_is_admin_returns_false_for_unknown_user(mock_app: MockApp, 
monkeypatch: "MonkeyPatch"):
+    monkeypatch.setattr("atr.config.get", lambda: MockConfig())
     mock_app.extensions["admins"] = frozenset({"alice", "bob"})
     assert user.is_admin("nobody") is False
 
 
-def test_is_admin_returns_true_for_additional_admin(mock_app: _MockApp, 
monkeypatch: "MonkeyPatch"):
+def test_is_admin_returns_true_for_additional_admin(mock_app: MockApp, 
monkeypatch: "MonkeyPatch"):
     user._get_additional_admin_users.cache_clear()
-    monkeypatch.setattr("atr.config.get", lambda: 
_MockConfig(admin_users_additional="alice,bob"))
+    monkeypatch.setattr("atr.config.get", lambda: 
MockConfig(admin_users_additional="alice,bob"))
     mock_app.extensions["admins"] = frozenset()
     assert user.is_admin("alice") is True
     assert user.is_admin("bob") is True
 
 
-def test_is_admin_returns_true_for_cached_admin(mock_app: _MockApp, 
monkeypatch: "MonkeyPatch"):
+def test_is_admin_returns_true_for_cached_admin(mock_app: MockApp, 
monkeypatch: "MonkeyPatch"):
     user._get_additional_admin_users.cache_clear()
-    monkeypatch.setattr("atr.config.get", lambda: _MockConfig())
+    monkeypatch.setattr("atr.config.get", lambda: MockConfig())
     mock_app.extensions["admins"] = frozenset({"cached_admin"})
     assert user.is_admin("cached_admin") is True
 
 
-def test_is_admin_returns_true_for_test_user_when_allowed(mock_app: _MockApp, 
monkeypatch: "MonkeyPatch"):
+def test_is_admin_returns_true_for_test_user_when_allowed(mock_app: MockApp, 
monkeypatch: "MonkeyPatch"):
     user._get_additional_admin_users.cache_clear()
-    monkeypatch.setattr("atr.config.get", lambda: 
_MockConfig(allow_tests=True))
+    monkeypatch.setattr("atr.config.get", lambda: MockConfig(allow_tests=True))
     mock_app.extensions["admins"] = frozenset()
     assert user.is_admin("test") is True
diff --git a/tests/unit/test_vote.py b/tests/unit/test_vote.py
index 7d7da0e..f204c9e 100644
--- a/tests/unit/test_vote.py
+++ b/tests/unit/test_vote.py
@@ -56,6 +56,27 @@ def mock_app(monkeypatch: MonkeyPatch) -> MockApp:
     return app
 
 
+def test_admin_not_on_pmc_has_non_binding_vote(mock_app: MockApp, monkeypatch: 
MonkeyPatch) -> None:
+    """Admins who are not PMC members do not get binding votes."""
+    user._get_additional_admin_users.cache_clear()
+    monkeypatch.setattr("atr.config.get", lambda: MockConfig())
+    mock_app.extensions["admins"] = frozenset({"admin_user"})
+
+    assert user.is_admin("admin_user") is True
+
+    committee = MockCommittee(
+        committee_members=["alice", "bob"],
+        committers=["alice", "bob", "charlie"],
+    )
+    # admin_user is not on this PMC
+    is_pmc_member = user.is_committee_member(committee, "admin_user")  # type: 
ignore[arg-type]
+    assert is_pmc_member is False
+
+    # Binding is determined ONLY by PMC membership, not admin status
+    is_binding = is_pmc_member
+    assert is_binding is False
+
+
 def test_binding_vote_body_format() -> None:
     """Binding votes include '(binding)' in the email body."""
     body = vote_writer.format_vote_email_body(
@@ -67,78 +88,55 @@ def test_binding_vote_body_format() -> None:
     assert body == "+1 (binding) (pmcmember) PMC Member"
 
 
-def test_non_binding_vote_body_format() -> None:
-    """Non-binding votes do not include '(binding)' in the email body."""
+def test_binding_vote_with_comment() -> None:
+    """Binding votes with comments include the comment and signature."""
     body = vote_writer.format_vote_email_body(
         vote="+1",
-        asf_uid="contributor",
-        fullname="A Contributor",
-        is_binding=False,
-    )
-    assert body == "+1 (contributor) A Contributor"
-
-
-def test_negative_binding_vote_body_format() -> None:
-    """Negative binding votes are formatted correctly."""
-    body = vote_writer.format_vote_email_body(
-        vote="-1",
         asf_uid="pmcmember",
         fullname="PMC Member",
         is_binding=True,
+        comment="Verified signatures and checksums. Tests pass.",
     )
-    assert body == "-1 (binding) (pmcmember) PMC Member"
-
-
-def test_zero_binding_vote_body_format() -> None:
-    """Zero binding votes are formatted correctly."""
-    body = vote_writer.format_vote_email_body(
-        vote="0",
-        asf_uid="voter",
-        fullname="Abstaining Voter",
-        is_binding=True,
+    expected = (
+        "+1 (binding) (pmcmember) PMC Member\n\n"
+        "Verified signatures and checksums. Tests pass.\n\n"
+        "-- \nPMC Member (pmcmember)"
     )
-    assert body == "0 (binding) (voter) Abstaining Voter"
+    assert body == expected
 
 
-def test_zero_non_binding_vote_body_format() -> None:
-    """Zero non-binding votes are formatted correctly."""
-    body = vote_writer.format_vote_email_body(
-        vote="0",
-        asf_uid="voter",
-        fullname="Abstaining Voter",
-        is_binding=False,
+def test_committer_non_pmc_has_non_binding_vote() -> None:
+    """Committers who are not PMC members have non-binding votes."""
+    committee = MockCommittee(
+        committee_members=["alice", "bob", "charlie"],
+        committers=["alice", "bob", "charlie", "dave", "eve"],
     )
-    assert body == "0 (voter) Abstaining Voter"
+    is_binding = user.is_committee_member(committee, "dave")  # type: 
ignore[arg-type]
+    assert is_binding is False
 
 
-def test_binding_vote_with_comment() -> None:
-    """Binding votes with comments include the comment and signature."""
+def test_empty_comment_no_signature() -> None:
+    """Empty comments do not add a signature."""
     body = vote_writer.format_vote_email_body(
         vote="+1",
         asf_uid="pmcmember",
         fullname="PMC Member",
         is_binding=True,
-        comment="Verified signatures and checksums. Tests pass.",
-    )
-    expected = (
-        "+1 (binding) (pmcmember) PMC Member\n\n"
-        "Verified signatures and checksums. Tests pass.\n\n"
-        "-- \nPMC Member (pmcmember)"
+        comment="",
     )
-    assert body == expected
+    assert body == "+1 (binding) (pmcmember) PMC Member"
+    assert "-- \n" not in body
 
 
-def test_non_binding_vote_with_comment() -> None:
-    """Non-binding votes with comments include the comment and signature."""
+def test_negative_binding_vote_body_format() -> None:
+    """Negative binding votes are formatted correctly."""
     body = vote_writer.format_vote_email_body(
-        vote="+1",
-        asf_uid="contributor",
-        fullname="A Contributor",
-        is_binding=False,
-        comment="Looks good to me!",
+        vote="-1",
+        asf_uid="pmcmember",
+        fullname="PMC Member",
+        is_binding=True,
     )
-    expected = "+1 (contributor) A Contributor\n\nLooks good to me!\n\n-- \nA 
Contributor (contributor)"
-    assert body == expected
+    assert body == "-1 (binding) (pmcmember) PMC Member"
 
 
 def test_negative_vote_with_comment() -> None:
@@ -158,37 +156,28 @@ def test_negative_vote_with_comment() -> None:
     assert body == expected
 
 
-def test_empty_comment_no_signature() -> None:
-    """Empty comments do not add a signature."""
+def test_non_binding_vote_body_format() -> None:
+    """Non-binding votes do not include '(binding)' in the email body."""
     body = vote_writer.format_vote_email_body(
         vote="+1",
-        asf_uid="pmcmember",
-        fullname="PMC Member",
-        is_binding=True,
-        comment="",
-    )
-    assert body == "+1 (binding) (pmcmember) PMC Member"
-    assert "-- \n" not in body
-
-
-def test_pmc_member_has_binding_vote() -> None:
-    """PMC members have binding votes."""
-    committee = MockCommittee(
-        committee_members=["alice", "bob", "charlie"],
-        committers=["alice", "bob", "charlie", "dave", "eve"],
+        asf_uid="contributor",
+        fullname="A Contributor",
+        is_binding=False,
     )
-    is_binding = user.is_committee_member(committee, "alice")  # type: 
ignore[arg-type]
-    assert is_binding is True
+    assert body == "+1 (contributor) A Contributor"
 
 
-def test_committer_non_pmc_has_non_binding_vote() -> None:
-    """Committers who are not PMC members have non-binding votes."""
-    committee = MockCommittee(
-        committee_members=["alice", "bob", "charlie"],
-        committers=["alice", "bob", "charlie", "dave", "eve"],
+def test_non_binding_vote_with_comment() -> None:
+    """Non-binding votes with comments include the comment and signature."""
+    body = vote_writer.format_vote_email_body(
+        vote="+1",
+        asf_uid="contributor",
+        fullname="A Contributor",
+        is_binding=False,
+        comment="Looks good to me!",
     )
-    is_binding = user.is_committee_member(committee, "dave")  # type: 
ignore[arg-type]
-    assert is_binding is False
+    expected = "+1 (contributor) A Contributor\n\nLooks good to me!\n\n-- \nA 
Contributor (contributor)"
+    assert body == expected
 
 
 def test_non_committer_has_non_binding_vote() -> None:
@@ -207,22 +196,33 @@ def test_none_committee_returns_false() -> None:
     assert is_binding is False
 
 
-def test_admin_not_on_pmc_has_non_binding_vote(mock_app: MockApp, monkeypatch: 
MonkeyPatch) -> None:
-    """Admins who are not PMC members do not get binding votes."""
-    user._get_additional_admin_users.cache_clear()
-    monkeypatch.setattr("atr.config.get", lambda: MockConfig())
-    mock_app.extensions["admins"] = frozenset({"admin_user"})
+def test_pmc_member_has_binding_vote() -> None:
+    """PMC members have binding votes."""
+    committee = MockCommittee(
+        committee_members=["alice", "bob", "charlie"],
+        committers=["alice", "bob", "charlie", "dave", "eve"],
+    )
+    is_binding = user.is_committee_member(committee, "alice")  # type: 
ignore[arg-type]
+    assert is_binding is True
 
-    assert user.is_admin("admin_user") is True
 
-    committee = MockCommittee(
-        committee_members=["alice", "bob"],
-        committers=["alice", "bob", "charlie"],
+def test_zero_binding_vote_body_format() -> None:
+    """Zero binding votes are formatted correctly."""
+    body = vote_writer.format_vote_email_body(
+        vote="0",
+        asf_uid="voter",
+        fullname="Abstaining Voter",
+        is_binding=True,
     )
-    # admin_user is not on this PMC
-    is_pmc_member = user.is_committee_member(committee, "admin_user")  # type: 
ignore[arg-type]
-    assert is_pmc_member is False
+    assert body == "0 (binding) (voter) Abstaining Voter"
 
-    # Binding is determined ONLY by PMC membership, not admin status
-    is_binding = is_pmc_member
-    assert is_binding is False
+
+def test_zero_non_binding_vote_body_format() -> None:
+    """Zero non-binding votes are formatted correctly."""
+    body = vote_writer.format_vote_email_body(
+        vote="0",
+        asf_uid="voter",
+        fullname="Abstaining Voter",
+        is_binding=False,
+    )
+    assert body == "0 (voter) Abstaining Voter"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to