This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-atr-experiments.git
The following commit(s) were added to refs/heads/main by this push:
new ca145ab Add an experimental worker module
ca145ab is described below
commit ca145abbb80c069b394ea86316c8cf46e28bee69
Author: Sean B. Palmer <[email protected]>
AuthorDate: Wed Feb 19 21:10:21 2025 +0200
Add an experimental worker module
---
atr/blueprints/secret/secret.py | 33 ++++
atr/db/models.py | 49 +++++-
atr/templates/secret/tasks-add-random.html | 48 ++++++
atr/worker.py | 180 +++++++++++++++++++++
...al_schema.py => 5d63e3dd5e93_initial_schema.py} | 6 +-
5 files changed, 312 insertions(+), 4 deletions(-)
diff --git a/atr/blueprints/secret/secret.py b/atr/blueprints/secret/secret.py
index 6dae033..5cb0e2c 100644
--- a/atr/blueprints/secret/secret.py
+++ b/atr/blueprints/secret/secret.py
@@ -15,7 +15,9 @@
# specific language governing permissions and limitations
# under the License.
+import datetime
import json
+import secrets
import httpx
from quart import current_app, flash, redirect, render_template, request,
url_for
@@ -33,6 +35,8 @@ from atr.db.models import (
ProductLine,
PublicSigningKey,
Release,
+ Task,
+ TaskStatus,
VotePolicy,
)
from atr.db.service import get_pmcs
@@ -57,6 +61,7 @@ async def secret_data(model: str = "PMC") -> str:
"DistributionChannel": DistributionChannel,
"PublicSigningKey": PublicSigningKey,
"PMCKeyLink": PMCKeyLink,
+ "Task": Task,
}
if model not in models:
@@ -187,3 +192,31 @@ async def secret_keys_delete_all() -> str:
await db_session.delete(key)
return f"Deleted {count} keys"
+
+
[email protected]("/tasks/add-random", methods=["GET", "POST"])
+async def secret_tasks_add_random() -> str | Response:
+ """Add a random task to the queue for testing."""
+ if request.method == "POST":
+ async with get_session() as db_session:
+ async with db_session.begin():
+ # Create a random task
+ task = Task(
+ id=None,
+ task_type="example",
+ task_args=json.dumps(
+ {
+ "random_number": secrets.randbelow(100),
+ "timestamp":
datetime.datetime.now(datetime.UTC).isoformat(),
+ }
+ ),
+ status=TaskStatus.QUEUED,
+ )
+ db_session.add(task)
+ # Flush to get the task ID
+ await db_session.flush()
+ await flash(f"Added random task (ID: {task.id})", "success")
+
+ return redirect(url_for("secret_blueprint.secret_tasks_add_random"))
+
+ return await render_template("secret/tasks-add-random.html")
diff --git a/atr/db/models.py b/atr/db/models.py
index aa0e58b..f7a37d8 100644
--- a/atr/db/models.py
+++ b/atr/db/models.py
@@ -22,7 +22,7 @@ from enum import Enum
from typing import Optional
from pydantic import BaseModel
-from sqlalchemy import JSON, Column
+from sqlalchemy import JSON, CheckConstraint, Column, Index
from sqlmodel import Field, Relationship, SQLModel
@@ -184,6 +184,53 @@ class ReleasePhase(str, Enum):
ARCHIVED = "archived"
+class TaskStatus(str, Enum):
+ """Status of a task in the task queue."""
+
+ QUEUED = "queued"
+ ACTIVE = "active"
+ COMPLETED = "completed"
+ FAILED = "failed"
+
+
+class Task(SQLModel, table=True):
+ """A task in the task queue."""
+
+ id: int | None = Field(default=None, primary_key=True)
+ task_type: str
+ task_args: str = Field(sa_column=Column(JSON))
+ status: TaskStatus = Field(default=TaskStatus.QUEUED, index=True)
+ added: datetime.datetime = Field(default_factory=lambda:
datetime.datetime.now(datetime.UTC), index=True)
+ started: datetime.datetime | None = None
+ completed: datetime.datetime | None = None
+ pid: int | None = None
+ error: str | None = None
+
+ # Create an index on status and added for efficient task claiming
+ __table_args__ = (
+ Index("ix_task_status_added", "status", "added"),
+ # Ensure valid status transitions:
+ # - QUEUED can transition to ACTIVE
+ # - ACTIVE can transition to COMPLETED or FAILED
+ # - COMPLETED and FAILED are terminal states
+ CheckConstraint(
+ """
+ (
+ -- Initial state is always valid
+ status = 'QUEUED'
+ -- QUEUED -> ACTIVE requires setting started time and pid
+ OR (status = 'ACTIVE' AND started IS NOT NULL AND pid IS NOT
NULL)
+ -- ACTIVE -> COMPLETED requires setting completed time
+ OR (status = 'COMPLETED' AND completed IS NOT NULL)
+ -- ACTIVE -> FAILED requires setting completed time and error
+ OR (status = 'FAILED' AND completed IS NOT NULL AND error IS
NOT NULL)
+ )
+ """,
+ name="valid_task_status_transitions",
+ ),
+ )
+
+
class Release(SQLModel, table=True):
storage_key: str = Field(primary_key=True)
stage: ReleaseStage
diff --git a/atr/templates/secret/tasks-add-random.html
b/atr/templates/secret/tasks-add-random.html
new file mode 100644
index 0000000..f140bb7
--- /dev/null
+++ b/atr/templates/secret/tasks-add-random.html
@@ -0,0 +1,48 @@
+{% extends "layouts/base.html" %}
+
+{% block title %}
+ Add random task ~ ATR
+{% endblock title %}
+
+{% block description %}
+ Add a random task to the queue for testing.
+{% endblock description %}
+
+{% block stylesheets %}
+ {{ super() }}
+ <style>
+ .status-message {
+ margin: 1.5rem 0;
+ padding: 1rem;
+ border-radius: 4px;
+ }
+
+ .status-message.success {
+ background: #d4edda;
+ border: 1px solid #c3e6cb;
+ color: #155724;
+ }
+
+ .status-message.error {
+ background: #f8d7da;
+ border: 1px solid #f5c6cb;
+ color: #721c24;
+ }
+ </style>
+{% endblock stylesheets %}
+
+{% block content %}
+ <div class="container mt-4">
+ <h1>Add random task</h1>
+
+ {% with messages = get_flashed_messages(with_categories=true) %}
+ {% if messages %}
+ {% for category, message in messages %}<div class="status-message {{
category }}">{{ message }}</div>{% endfor %}
+ {% endif %}
+ {% endwith %}
+
+ <form method="post" class="mt-4">
+ <button type="submit" class="btn btn-primary">Add random task</button>
+ </form>
+ </div>
+{% endblock content %}
diff --git a/atr/worker.py b/atr/worker.py
new file mode 100644
index 0000000..7c4d3e2
--- /dev/null
+++ b/atr/worker.py
@@ -0,0 +1,180 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""worker.py - Task worker process for ATR"""
+
+# TODO: If started is older than some threshold and status
+# is active but the pid is no longer running, we can revert
+# the task to status='QUEUED'. For this to work, ideally we
+# need to check wall clock time as well as CPU time.
+
+import datetime
+import logging
+import os
+import resource
+import signal
+import sys
+import time
+from datetime import UTC
+from typing import NoReturn
+
+from sqlalchemy import create_engine, text
+from sqlalchemy.orm import Session
+
+# Configure logging
+logging.basicConfig(
+ format="[%(asctime)s.%(msecs)03d] [%(process)d] [%(levelname)s]
%(message)s",
+ level=logging.INFO,
+ datefmt="%Y-%m-%d %H:%M:%S",
+)
+logger = logging.getLogger(__name__)
+
+# Resource limits, 5 minutes and 1GB
+CPU_LIMIT_SECONDS = 300
+MEMORY_LIMIT_BYTES = 1024 * 1024 * 1024
+
+# Create database engine
+engine = create_engine("sqlite:///atr.db", echo=False)
+
+# # Create tables if they don't exist
+# SQLModel.metadata.create_all(engine)
+
+
+def get_db_session() -> Session:
+ """Get a new database session."""
+ return Session(engine)
+
+
+def claim_next_task() -> tuple[int, str, str] | None:
+ """
+ Attempt to claim the oldest unclaimed task.
+ Returns (task_id, task_type, task_args) if successful.
+ Returns None if no tasks are available.
+ """
+ with get_db_session() as session:
+ with session.begin():
+ # Find and claim the oldest unclaimed task
+ # We have an index on (status, added)
+ result = session.execute(
+ text("""
+ UPDATE task
+ SET started = :now, pid = :pid, status = 'ACTIVE'
+ WHERE id = (
+ SELECT id FROM task
+ WHERE status = 'QUEUED'
+ ORDER BY added ASC LIMIT 1
+ )
+ AND status = 'QUEUED'
+ RETURNING id, task_type, task_args
+ """),
+ {"now": datetime.datetime.now(UTC), "pid": os.getpid()},
+ )
+ task = result.first()
+ if task:
+ task_id, task_type, task_args = task
+ logger.info(f"Claimed task {task_id} ({task_type}) with args
{task_args}")
+ return task_id, task_type, task_args
+
+ return None
+
+
+def process_task(task_id: int, task_type: str, task_args: str) -> None:
+ """Process a claimed task."""
+ logger.info(f"Processing task {task_id} ({task_type}) with args
{task_args}")
+ try:
+ # TODO: Implement actual task processing
+ time.sleep(1)
+
+ with get_db_session() as session:
+ with session.begin():
+ session.execute(
+ text("""
+ UPDATE task
+ SET completed = :now, status = 'COMPLETED'
+ WHERE id = :task_id
+ """),
+ {"now": datetime.datetime.now(UTC), "task_id": task_id},
+ )
+ except Exception as e:
+ logger.error(f"Task {task_id} failed: {e}")
+ with get_db_session() as session:
+ with session.begin():
+ session.execute(
+ text("""
+ UPDATE task
+ SET completed = :now, status = 'FAILED', error = :error
+ WHERE id = :task_id
+ """),
+ {"now": datetime.datetime.now(UTC), "task_id": task_id,
"error": str(e)},
+ )
+
+
+def set_resource_limits() -> None:
+ """Set CPU and memory limits for this process."""
+ # Set CPU time limit
+ try:
+ resource.setrlimit(resource.RLIMIT_CPU, (CPU_LIMIT_SECONDS,
CPU_LIMIT_SECONDS))
+ logger.info(f"Set CPU time limit to {CPU_LIMIT_SECONDS} seconds")
+ except ValueError as e:
+ logger.warning(f"Could not set CPU time limit: {e}")
+
+ # Set memory limit
+ try:
+ resource.setrlimit(resource.RLIMIT_AS, (MEMORY_LIMIT_BYTES,
MEMORY_LIMIT_BYTES))
+ logger.info(f"Set memory limit to {MEMORY_LIMIT_BYTES} bytes")
+ except ValueError as e:
+ logger.warning(f"Could not set memory limit: {e}")
+
+
+def worker_loop() -> NoReturn:
+ """Main worker loop."""
+ logger.info(f"Worker starting (PID: {os.getpid()})")
+
+ while True:
+ try:
+ task = claim_next_task()
+ if task:
+ task_id, task_type, task_args = task
+ process_task(task_id, task_type, task_args)
+ else:
+ # No tasks available, wait 20ms before checking again
+ time.sleep(0.02)
+ except Exception as e:
+ # TODO: Should probably be more robust about this
+ logger.error(f"Worker loop error: {e}")
+ time.sleep(1)
+
+
+def signal_handler(signum: int, frame: object) -> None:
+ """Handle termination signals gracefully."""
+ # For RLIMIT_AS we'll generally get a SIGKILL
+ # For RLIMIT_CPU we'll get a SIGXCPU, which we can catch
+ logger.info(f"Received signal {signum}, shutting down...")
+ sys.exit(0)
+
+
+def main() -> None:
+ """Main entry point."""
+ signal.signal(signal.SIGTERM, signal_handler)
+ signal.signal(signal.SIGINT, signal_handler)
+
+ set_resource_limits()
+ worker_loop()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/migrations/versions/b8dd95b83501_initial_schema.py
b/migrations/versions/5d63e3dd5e93_initial_schema.py
similarity index 84%
rename from migrations/versions/b8dd95b83501_initial_schema.py
rename to migrations/versions/5d63e3dd5e93_initial_schema.py
index bc0deed..104e000 100644
--- a/migrations/versions/b8dd95b83501_initial_schema.py
+++ b/migrations/versions/5d63e3dd5e93_initial_schema.py
@@ -1,15 +1,15 @@
"""initial_schema
-Revision ID: b8dd95b83501
+Revision ID: 5d63e3dd5e93
Revises:
-Create Date: 2025-02-19 20:20:11.349128
+Create Date: 2025-02-19 20:56:26.175412
"""
from collections.abc import Sequence
# revision identifiers, used by Alembic.
-revision: str = "b8dd95b83501"
+revision: str = "5d63e3dd5e93"
down_revision: str | None = None
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]