The AI review scripts share similar options and infrastructure.

Signed-off-by: Stephen Hemminger <[email protected]>
---
 devtools/ai/_common.py | 246 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 246 insertions(+)
 create mode 100644 devtools/ai/_common.py

diff --git a/devtools/ai/_common.py b/devtools/ai/_common.py
new file mode 100644
index 0000000000..69982cbda5
--- /dev/null
+++ b/devtools/ai/_common.py
@@ -0,0 +1,246 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2026 Stephen Hemminger
+
+"""Common utilities shared by the DPDK AI review scripts."""
+
+import argparse
+import json
+import subprocess
+import sys
+from dataclasses import dataclass
+from typing import Any, NoReturn
+from urllib.error import HTTPError, URLError
+from urllib.request import Request, urlopen
+
+# Provider configurations (model defaults; override with --model).
+PROVIDERS: dict[str, dict[str, str]] = {
+    "anthropic": {
+        "name": "Claude",
+        "endpoint": "https://api.anthropic.com/v1/messages";,
+        "default_model": "claude-sonnet-4-5-20250929",
+        "env_var": "ANTHROPIC_API_KEY",
+    },
+    "openai": {
+        "name": "ChatGPT",
+        "endpoint": "https://api.openai.com/v1/chat/completions";,
+        "default_model": "gpt-4.1",
+        "env_var": "OPENAI_API_KEY",
+    },
+    "xai": {
+        "name": "Grok",
+        "endpoint": "https://api.x.ai/v1/chat/completions";,
+        "default_model": "grok-4-1-fast-non-reasoning",
+        "env_var": "XAI_API_KEY",
+    },
+    "google": {
+        "name": "Gemini",
+        "endpoint": "https://generativelanguage.googleapis.com/v1beta/models";,
+        "default_model": "gemini-3-flash-preview",
+        "env_var": "GOOGLE_API_KEY",
+    },
+}
+
+
+def error(msg: str) -> NoReturn:
+    """Print error message to stderr and exit with status 1."""
+    print(f"Error: {msg}", file=sys.stderr)
+    sys.exit(1)
+
+
+def get_git_config(key: str) -> str | None:
+    """Return the value of a git config key, or None if unset/git missing."""
+    try:
+        result = subprocess.run(
+            ["git", "config", "--get", key],
+            capture_output=True,
+            text=True,
+            check=True,
+        )
+        return result.stdout.strip()
+    except (subprocess.CalledProcessError, FileNotFoundError):
+        return None
+
+
+def list_providers() -> NoReturn:
+    """Print available providers and exit."""
+    print("Available AI Providers:\n")
+    print(f"{'Provider':<12} {'Default Model':<30} {'API Key Variable'}")
+    print(f"{'--------':<12} {'-------------':<30} {'----------------'}")
+    for name, config in PROVIDERS.items():
+        print(f"{name:<12} {config['default_model']:<30} {config['env_var']}")
+    sys.exit(0)
+
+
+@dataclass
+class TokenUsage:
+    """Accumulated token usage across API calls."""
+
+    input_tokens: int = 0
+    output_tokens: int = 0
+    cache_creation_tokens: int = 0
+    cache_read_tokens: int = 0
+    api_calls: int = 0
+
+    def add(self, other: "TokenUsage") -> None:
+        """Accumulate usage from another TokenUsage."""
+        self.input_tokens += other.input_tokens
+        self.output_tokens += other.output_tokens
+        self.cache_creation_tokens += other.cache_creation_tokens
+        self.cache_read_tokens += other.cache_read_tokens
+        self.api_calls += other.api_calls
+
+
+def format_token_summary(usage: TokenUsage, provider: str, model: str) -> str:
+    """Format a token usage summary string."""
+    provider_label = PROVIDERS.get(provider, {}).get("name", provider)
+    lines = ["=== Token Usage Summary ==="]
+    lines.append(f"Provider:      {provider_label} ({model})")
+    lines.append(f"API calls:     {usage.api_calls}")
+    lines.append(f"Input tokens:  {usage.input_tokens:,}")
+    lines.append(f"Output tokens: {usage.output_tokens:,}")
+    if usage.cache_creation_tokens:
+        lines.append(f"Cache write:   {usage.cache_creation_tokens:,}")
+    if usage.cache_read_tokens:
+        lines.append(f"Cache read:    {usage.cache_read_tokens:,}")
+    total = usage.input_tokens + usage.output_tokens
+    lines.append(f"Total tokens:  {total:,}")
+    lines.append("=" * 27)
+    return "\n".join(lines)
+
+
+def add_token_args(parser: argparse.ArgumentParser) -> None:
+    """Add the --show-tokens flag to an ArgumentParser."""
+    parser.add_argument(
+        "--show-tokens",
+        action="store_true",
+        help="Show token usage summary on stderr after the run",
+    )
+
+
+def print_token_summary(
+    usage: TokenUsage, provider: str, model: str, show: bool
+) -> None:
+    """Print token usage summary to stderr if requested and any calls were 
made."""
+    if not show or usage.api_calls == 0:
+        return
+    print("", file=sys.stderr)
+    print(format_token_summary(usage, provider, model), file=sys.stderr)
+
+
+def _build_request_meta(
+    provider: str, api_key: str, model: str
+) -> tuple[str, dict[str, str]]:
+    """Return (url, headers) for a provider request."""
+    config = PROVIDERS[provider]
+    if provider == "anthropic":
+        return config["endpoint"], {
+            "Content-Type": "application/json",
+            "x-api-key": api_key,
+            "anthropic-version": "2023-06-01",
+        }
+    if provider == "google":
+        url = f"{config['endpoint']}/{model}:generateContent?key={api_key}"
+        return url, {"Content-Type": "application/json"}
+    # openai, xai
+    return config["endpoint"], {
+        "Content-Type": "application/json",
+        "Authorization": f"Bearer {api_key}",
+    }
+
+
+def _extract_usage(provider: str, result: dict[str, Any]) -> TokenUsage:
+    """Extract TokenUsage from a provider response."""
+    usage = TokenUsage(api_calls=1)
+    if provider == "anthropic":
+        raw = result.get("usage", {})
+        usage.input_tokens = raw.get("input_tokens", 0)
+        usage.output_tokens = raw.get("output_tokens", 0)
+        usage.cache_creation_tokens = raw.get("cache_creation_input_tokens", 0)
+        usage.cache_read_tokens = raw.get("cache_read_input_tokens", 0)
+    elif provider == "google":
+        raw = result.get("usageMetadata", {})
+        usage.input_tokens = raw.get("promptTokenCount", 0)
+        usage.output_tokens = raw.get("candidatesTokenCount", 0)
+    else:  # openai, xai
+        raw = result.get("usage", {})
+        usage.input_tokens = raw.get("prompt_tokens", 0)
+        usage.output_tokens = raw.get("completion_tokens", 0)
+        cache_details = raw.get("prompt_tokens_details", {})
+        if cache_details:
+            usage.cache_read_tokens = cache_details.get("cached_tokens", 0)
+    return usage
+
+
+def _extract_text(provider: str, result: dict[str, Any]) -> str:
+    """Extract response text from a provider response. Calls error() on 
failure."""
+    if "error" in result:
+        error(f"API error: {result['error'].get('message', result)}")
+    if provider == "anthropic":
+        content = result.get("content", [])
+        return "".join(
+            block.get("text", "") for block in content if block.get("type") == 
"text"
+        )
+    if provider == "google":
+        candidates = result.get("candidates", [])
+        if not candidates:
+            error("No response from Gemini")
+        parts = candidates[0].get("content", {}).get("parts", [])
+        return "".join(part.get("text", "") for part in parts)
+    # openai, xai
+    choices = result.get("choices", [])
+    if not choices:
+        error("No response from API")
+    return choices[0].get("message", {}).get("content", "")
+
+
+def _print_verbose_usage(usage: TokenUsage) -> None:
+    """Print per-call token details to stderr."""
+    print("=== Token Usage ===", file=sys.stderr)
+    print(f"Input tokens: {usage.input_tokens:,}", file=sys.stderr)
+    print(f"Output tokens: {usage.output_tokens:,}", file=sys.stderr)
+    if usage.cache_creation_tokens:
+        print(f"Cache creation: {usage.cache_creation_tokens:,}", 
file=sys.stderr)
+    if usage.cache_read_tokens:
+        print(f"Cache read: {usage.cache_read_tokens:,}", file=sys.stderr)
+    print("===================", file=sys.stderr)
+
+
+def send_request(
+    provider: str,
+    api_key: str,
+    model: str,
+    request_data: dict[str, Any],
+    *,
+    timeout: int = 120,
+    verbose: bool = False,
+) -> tuple[str, TokenUsage]:
+    """Send a prebuilt request to a provider and return (response_text, usage).
+
+    The caller assembles the provider-specific request body via its own
+    build_*_request helpers (the prompts differ per script). This function
+    handles transport, error reporting, and token-usage extraction.
+    """
+    url, headers = _build_request_meta(provider, api_key, model)
+    body = json.dumps(request_data).encode("utf-8")
+    req = Request(url, data=body, headers=headers)
+
+    try:
+        with urlopen(req, timeout=timeout) as response:
+            result = json.loads(response.read().decode("utf-8"))
+    except HTTPError as e:
+        error_body = e.read().decode("utf-8")
+        try:
+            error_data = json.loads(error_body)
+            error(f"API error: {error_data.get('error', error_body)}")
+        except json.JSONDecodeError:
+            error(f"API error ({e.code}): {error_body}")
+    except URLError as e:
+        if isinstance(e.reason, TimeoutError):
+            error(f"Request timed out after {timeout} seconds")
+        error(f"Connection error: {e.reason}")
+
+    usage = _extract_usage(provider, result)
+    if verbose:
+        _print_verbose_usage(usage)
+    return _extract_text(provider, result), usage
-- 
2.53.0

Reply via email to